1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.solr.handler;
18
19 import java.io.File;
20 import java.io.FileInputStream;
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.InputStreamReader;
25 import java.io.OutputStream;
26 import java.lang.invoke.MethodHandles;
27 import java.nio.ByteBuffer;
28 import java.nio.channels.FileChannel;
29 import java.nio.charset.StandardCharsets;
30 import java.nio.file.NoSuchFileException;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.Date;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Properties;
40 import java.util.Random;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.ScheduledFuture;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.locks.ReentrantLock;
49 import java.util.regex.Matcher;
50 import java.util.regex.Pattern;
51 import java.util.zip.Adler32;
52 import java.util.zip.Checksum;
53 import java.util.zip.DeflaterOutputStream;
54
55 import org.apache.commons.io.IOUtils;
56 import org.apache.lucene.codecs.CodecUtil;
57 import org.apache.lucene.index.DirectoryReader;
58 import org.apache.lucene.index.IndexCommit;
59 import org.apache.lucene.index.IndexDeletionPolicy;
60 import org.apache.lucene.index.IndexWriter;
61 import org.apache.lucene.index.SegmentCommitInfo;
62 import org.apache.lucene.index.SegmentInfos;
63 import org.apache.lucene.store.Directory;
64 import org.apache.lucene.store.IOContext;
65 import org.apache.lucene.store.IndexInput;
66 import org.apache.lucene.store.RateLimiter;
67 import org.apache.lucene.util.Version;
68 import org.apache.solr.common.SolrException;
69 import org.apache.solr.common.SolrException.ErrorCode;
70 import org.apache.solr.common.params.CommonParams;
71 import org.apache.solr.common.params.ModifiableSolrParams;
72 import org.apache.solr.common.params.SolrParams;
73 import org.apache.solr.common.util.ExecutorUtil;
74 import org.apache.solr.common.util.FastOutputStream;
75 import org.apache.solr.common.util.NamedList;
76 import org.apache.solr.common.util.SimpleOrderedMap;
77 import org.apache.solr.common.util.StrUtils;
78 import org.apache.solr.common.util.SuppressForbidden;
79 import org.apache.solr.core.CloseHook;
80 import org.apache.solr.core.DirectoryFactory;
81 import org.apache.solr.core.DirectoryFactory.DirContext;
82 import org.apache.solr.core.IndexDeletionPolicyWrapper;
83 import org.apache.solr.core.SolrCore;
84 import org.apache.solr.core.SolrDeletionPolicy;
85 import org.apache.solr.core.SolrEventListener;
86 import org.apache.solr.request.SolrQueryRequest;
87 import org.apache.solr.response.SolrQueryResponse;
88 import org.apache.solr.search.SolrIndexSearcher;
89 import org.apache.solr.update.SolrIndexWriter;
90 import org.apache.solr.util.DefaultSolrThreadFactory;
91 import org.apache.solr.util.NumberUtils;
92 import org.apache.solr.util.PropertiesInputStream;
93 import org.apache.solr.util.RefCounted;
94 import org.apache.solr.util.plugin.SolrCoreAware;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
97 import org.slf4j.MDC;
98
99 import static org.apache.solr.common.params.CommonParams.NAME;
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
117
118 private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
119 SolrCore core;
120
121 private volatile boolean closed = false;
122
123 private static final class CommitVersionInfo {
124 public final long version;
125 public final long generation;
126 private CommitVersionInfo(long g, long v) {
127 generation = g;
128 version = v;
129 }
130
131
132
133
134
135 public static CommitVersionInfo build(IndexCommit commit) {
136 long generation = commit.getGeneration();
137 long version = 0;
138 try {
139 final Map<String,String> commitData = commit.getUserData();
140 String commitTime = commitData.get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
141 if (commitTime != null) {
142 try {
143 version = Long.parseLong(commitTime);
144 } catch (NumberFormatException e) {
145 LOG.warn("Version in commitData was not formated correctly: " + commitTime, e);
146 }
147 }
148 } catch (IOException e) {
149 LOG.warn("Unable to get version from commitData, commit: " + commit, e);
150 }
151 return new CommitVersionInfo(generation, version);
152 }
153 }
154
155 private IndexFetcher pollingIndexFetcher;
156
157 private ReentrantLock indexFetchLock = new ReentrantLock();
158
159 private ExecutorService restoreExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(
160 new DefaultSolrThreadFactory("restoreExecutor"));
161
162 private volatile Future<Boolean> restoreFuture;
163
164 private volatile String currentRestoreName;
165
166 private String includeConfFiles;
167
168 private NamedList<String> confFileNameAlias = new NamedList<>();
169
170 private boolean isMaster = false;
171
172 private boolean isSlave = false;
173
174 private boolean replicateOnOptimize = false;
175
176 private boolean replicateOnCommit = false;
177
178 private boolean replicateOnStart = false;
179
180 private ScheduledExecutorService executorService;
181
182 private volatile long executorStartTime;
183
184 private int numberBackupsToKeep = 0;
185
186 private int numTimesReplicated = 0;
187
188 private final Map<String, FileInfo> confFileInfoCache = new HashMap<>();
189
190 private Integer reserveCommitDuration = readIntervalMs("00:00:10");
191
192 volatile IndexCommit indexCommitPoint;
193
194 volatile NamedList<Object> snapShootDetails;
195
196 private AtomicBoolean replicationEnabled = new AtomicBoolean(true);
197
198 private Long pollIntervalNs;
199 private String pollIntervalStr;
200
201
202
203
204 private AtomicBoolean pollDisabled = new AtomicBoolean(false);
205
206 String getPollInterval() {
207 return pollIntervalStr;
208 }
209
210 @Override
211 public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
212 rsp.setHttpCaching(false);
213 final SolrParams solrParams = req.getParams();
214 String command = solrParams.get(COMMAND);
215 if (command == null) {
216 rsp.add(STATUS, OK_STATUS);
217 rsp.add("message", "No command");
218 return;
219 }
220
221
222 if (command.equals(CMD_INDEX_VERSION)) {
223 IndexCommit commitPoint = indexCommitPoint;
224
225 if (commitPoint == null) {
226
227
228 commitPoint = core.getDeletionPolicy().getLatestCommit();
229 }
230
231 if (commitPoint != null && replicationEnabled.get()) {
232
233
234
235
236
237
238 core.getDeletionPolicy().setReserveDuration(commitPoint.getGeneration(), reserveCommitDuration);
239 rsp.add(CMD_INDEX_VERSION, IndexDeletionPolicyWrapper.getCommitTimestamp(commitPoint));
240 rsp.add(GENERATION, commitPoint.getGeneration());
241 } else {
242
243
244 rsp.add(CMD_INDEX_VERSION, 0L);
245 rsp.add(GENERATION, 0L);
246 }
247 } else if (command.equals(CMD_GET_FILE)) {
248 getFileStream(solrParams, rsp);
249 } else if (command.equals(CMD_GET_FILE_LIST)) {
250 getFileList(solrParams, rsp);
251 } else if (command.equalsIgnoreCase(CMD_BACKUP)) {
252 doSnapShoot(new ModifiableSolrParams(solrParams), rsp, req);
253 rsp.add(STATUS, OK_STATUS);
254 } else if (command.equalsIgnoreCase(CMD_RESTORE)) {
255 restore(new ModifiableSolrParams(solrParams), rsp, req);
256 rsp.add(STATUS, OK_STATUS);
257 } else if (command.equalsIgnoreCase(CMD_RESTORE_STATUS)) {
258 rsp.add(CMD_RESTORE_STATUS, getRestoreStatus());
259 } else if (command.equalsIgnoreCase(CMD_DELETE_BACKUP)) {
260 deleteSnapshot(new ModifiableSolrParams(solrParams));
261 rsp.add(STATUS, OK_STATUS);
262 } else if (command.equalsIgnoreCase(CMD_FETCH_INDEX)) {
263 String masterUrl = solrParams.get(MASTER_URL);
264 if (!isSlave && masterUrl == null) {
265 rsp.add(STATUS,ERR_STATUS);
266 rsp.add("message","No slave configured or no 'masterUrl' Specified");
267 return;
268 }
269 final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);
270 Thread fetchThread = new Thread("explicit-fetchindex-cmd") {
271 @Override
272 public void run() {
273 doFetch(paramsCopy, false);
274 }
275 };
276 fetchThread.setDaemon(false);
277 fetchThread.start();
278 if (solrParams.getBool(WAIT, false)) {
279 fetchThread.join();
280 }
281 rsp.add(STATUS, OK_STATUS);
282 } else if (command.equalsIgnoreCase(CMD_DISABLE_POLL)) {
283 if (pollingIndexFetcher != null){
284 disablePoll();
285 rsp.add(STATUS, OK_STATUS);
286 } else {
287 rsp.add(STATUS, ERR_STATUS);
288 rsp.add("message","No slave configured");
289 }
290 } else if (command.equalsIgnoreCase(CMD_ENABLE_POLL)) {
291 if (pollingIndexFetcher != null){
292 enablePoll();
293 rsp.add(STATUS, OK_STATUS);
294 }else {
295 rsp.add(STATUS,ERR_STATUS);
296 rsp.add("message","No slave configured");
297 }
298 } else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
299 IndexFetcher fetcher = currentIndexFetcher;
300 if (fetcher != null){
301 fetcher.abortFetch();
302 rsp.add(STATUS, OK_STATUS);
303 } else {
304 rsp.add(STATUS,ERR_STATUS);
305 rsp.add("message","No slave configured");
306 }
307 } else if (command.equals(CMD_SHOW_COMMITS)) {
308 rsp.add(CMD_SHOW_COMMITS, getCommits());
309 } else if (command.equals(CMD_DETAILS)) {
310 rsp.add(CMD_DETAILS, getReplicationDetails(solrParams.getBool("slave", true)));
311 } else if (CMD_ENABLE_REPL.equalsIgnoreCase(command)) {
312 replicationEnabled.set(true);
313 rsp.add(STATUS, OK_STATUS);
314 } else if (CMD_DISABLE_REPL.equalsIgnoreCase(command)) {
315 replicationEnabled.set(false);
316 rsp.add(STATUS, OK_STATUS);
317 }
318 }
319
320 private void deleteSnapshot(ModifiableSolrParams params) {
321 String name = params.get(NAME);
322 if(name == null) {
323 throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name");
324 }
325
326 SnapShooter snapShooter = new SnapShooter(core, params.get(LOCATION), params.get(NAME));
327 snapShooter.validateDeleteSnapshot();
328 snapShooter.deleteSnapAsync(this);
329 }
330
331 private List<NamedList<Object>> getCommits() {
332 Map<Long, IndexCommit> commits = core.getDeletionPolicy().getCommits();
333 List<NamedList<Object>> l = new ArrayList<>();
334
335 for (IndexCommit c : commits.values()) {
336 try {
337 NamedList<Object> nl = new NamedList<>();
338 nl.add("indexVersion", IndexDeletionPolicyWrapper.getCommitTimestamp(c));
339 nl.add(GENERATION, c.getGeneration());
340 List<String> commitList = new ArrayList<>(c.getFileNames().size());
341 commitList.addAll(c.getFileNames());
342 Collections.sort(commitList);
343 nl.add(CMD_GET_FILE_LIST, commitList);
344 l.add(nl);
345 } catch (IOException e) {
346 LOG.warn("Exception while reading files for commit " + c, e);
347 }
348 }
349 return l;
350 }
351
352 static Long getCheckSum(Checksum checksum, File f) {
353 FileInputStream fis = null;
354 checksum.reset();
355 byte[] buffer = new byte[1024 * 1024];
356 int bytesRead;
357 try {
358 fis = new FileInputStream(f);
359 while ((bytesRead = fis.read(buffer)) >= 0)
360 checksum.update(buffer, 0, bytesRead);
361 return checksum.getValue();
362 } catch (Exception e) {
363 LOG.warn("Exception in finding checksum of " + f, e);
364 } finally {
365 IOUtils.closeQuietly(fis);
366 }
367 return null;
368 }
369
370 private volatile IndexFetcher currentIndexFetcher;
371
372 public boolean doFetch(SolrParams solrParams, boolean forceReplication) {
373 String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
374 if (!indexFetchLock.tryLock())
375 return false;
376 try {
377 if (masterUrl != null) {
378 if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
379 currentIndexFetcher.destroy();
380 }
381 currentIndexFetcher = new IndexFetcher(solrParams.toNamedList(), this, core);
382 } else {
383 currentIndexFetcher = pollingIndexFetcher;
384 }
385 return currentIndexFetcher.fetchLatestIndex(forceReplication);
386 } catch (Exception e) {
387 SolrException.log(LOG, "Index fetch failed ", e);
388 } finally {
389 if (pollingIndexFetcher != null) {
390 currentIndexFetcher = pollingIndexFetcher;
391 }
392 indexFetchLock.unlock();
393 }
394 return false;
395 }
396
397 boolean isReplicating() {
398 return indexFetchLock.isLocked();
399 }
400
401 private void restore(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
402 if (restoreFuture != null && !restoreFuture.isDone()) {
403 throw new SolrException(ErrorCode.BAD_REQUEST, "Restore in progress. Cannot run multiple restore operations" +
404 "for the same core");
405 }
406 String name = params.get(NAME);
407 String location = params.get(LOCATION);
408
409
410 if (location == null) {
411 location = core.getDataDir();
412 }
413
414
415
416 if (name == null) {
417 File[] files = new File(location).listFiles();
418 List<OldBackupDirectory> dirs = new ArrayList<>();
419 for (File f : files) {
420 OldBackupDirectory obd = new OldBackupDirectory(f);
421 if (obd.dir != null) {
422 dirs.add(obd);
423 }
424 }
425 Collections.sort(dirs);
426 if (dirs.size() == 0) {
427 throw new SolrException(ErrorCode.BAD_REQUEST, "No backup name specified and none found in " + core.getDataDir());
428 }
429 name = dirs.get(0).dir.getName();
430 } else {
431
432 name = "snapshot." + name;
433 }
434
435 RestoreCore restoreCore = new RestoreCore(core, location, name);
436 try {
437 MDC.put("RestoreCore.core", core.getName());
438 MDC.put("RestoreCore.backupLocation", location);
439 MDC.put("RestoreCore.backupName", name);
440 restoreFuture = restoreExecutor.submit(restoreCore);
441 currentRestoreName = name;
442 } finally {
443 MDC.remove("RestoreCore.core");
444 MDC.remove("RestoreCore.backupLocation");
445 MDC.remove("RestoreCore.backupName");
446 }
447 }
448
449 private NamedList<Object> getRestoreStatus() {
450 NamedList<Object> status = new SimpleOrderedMap<>();
451
452 if (restoreFuture == null) {
453 status.add(STATUS, "No restore actions in progress");
454 return status;
455 }
456
457 status.add("snapshotName", currentRestoreName);
458 if (restoreFuture.isDone()) {
459 try {
460 boolean success = restoreFuture.get();
461 if (success) {
462 status.add(STATUS, SUCCESS);
463 } else {
464 status.add(STATUS, FAILED);
465 }
466 } catch (Exception e) {
467 status.add(STATUS, FAILED);
468 status.add(EXCEPTION, e.getMessage());
469 }
470 } else {
471 status.add(STATUS, "In Progress");
472 }
473 return status;
474 }
475
476 private void doSnapShoot(SolrParams params, SolrQueryResponse rsp,
477 SolrQueryRequest req) {
478 try {
479 int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM, 0);
480 if (numberToKeep > 0 && numberBackupsToKeep > 0) {
481 throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot use "
482 + NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM + " if "
483 + NUMBER_BACKUPS_TO_KEEP_INIT_PARAM
484 + " was specified in the configuration.");
485 }
486 numberToKeep = Math.max(numberToKeep, numberBackupsToKeep);
487 if (numberToKeep < 1) {
488 numberToKeep = Integer.MAX_VALUE;
489 }
490
491 IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
492 IndexCommit indexCommit = delPolicy.getLatestCommit();
493
494 if (indexCommit == null) {
495 indexCommit = req.getSearcher().getIndexReader().getIndexCommit();
496 }
497
498
499 SnapShooter snapShooter = new SnapShooter(core, params.get("location"), params.get(NAME));
500 snapShooter.validateCreateSnapshot();
501 snapShooter.createSnapAsync(indexCommit, numberToKeep, this);
502
503 } catch (Exception e) {
504 LOG.warn("Exception during creating a snapshot", e);
505 rsp.add("exception", e);
506 }
507 }
508
509
510
511
512
513
514
515
516 private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
517 ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
518 rawParams.set(CommonParams.WT, FILE_STREAM);
519
520 String cfileName = solrParams.get(CONF_FILE_SHORT);
521 if (cfileName != null) {
522 rsp.add(FILE_STREAM, new LocalFsFileStream(solrParams));
523 } else {
524 rsp.add(FILE_STREAM, new DirectoryFileStream(solrParams));
525 }
526 }
527
528 @SuppressWarnings("unchecked")
529 private void getFileList(SolrParams solrParams, SolrQueryResponse rsp) {
530 String v = solrParams.get(GENERATION);
531 if (v == null) {
532 rsp.add("status", "no index generation specified");
533 return;
534 }
535 long gen = Long.parseLong(v);
536 IndexCommit commit = core.getDeletionPolicy().getCommitPoint(gen);
537
538
539 if (commit == null) {
540 rsp.add("status", "invalid index generation");
541 return;
542 }
543
544 core.getDeletionPolicy().setReserveDuration(gen, reserveCommitDuration);
545 List<Map<String, Object>> result = new ArrayList<>();
546 Directory dir = null;
547 Version version = null;
548 try {
549 dir = core.getDirectoryFactory().get(core.getNewIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
550 SegmentInfos infos = SegmentInfos.readCommit(dir, commit.getSegmentsFileName());
551 Version oldestVersion = Version.LUCENE_CURRENT;
552 for (SegmentCommitInfo commitInfo : infos) {
553 version = commitInfo.info.getVersion();
554 if (oldestVersion.onOrAfter(version)) {
555 oldestVersion = version;
556 }
557 for (String file : commitInfo.files()) {
558 Map<String,Object> fileMeta = new HashMap<>();
559 fileMeta.put(NAME, file);
560 fileMeta.put(SIZE, dir.fileLength(file));
561
562 try (final IndexInput in = dir.openInput(file, IOContext.READONCE)) {
563 if (version.onOrAfter(Version.LUCENE_4_8_0)) {
564 try {
565 long checksum = CodecUtil.retrieveChecksum(in);
566 fileMeta.put(CHECKSUM, checksum);
567 } catch(Exception e) {
568 LOG.warn("Could not read checksum from index file: " + file, e);
569 }
570 }
571 }
572
573 result.add(fileMeta);
574 }
575 }
576
577
578
579
580
581
582 Map<String,Object> fileMeta = new HashMap<>();
583 fileMeta.put(NAME, infos.getSegmentsFileName());
584 fileMeta.put(SIZE, dir.fileLength(infos.getSegmentsFileName()));
585 if (infos.getId() != null) {
586 try (final IndexInput in = dir.openInput(infos.getSegmentsFileName(), IOContext.READONCE)) {
587 if (oldestVersion.onOrAfter(Version.LUCENE_4_8_0)) {
588 try {
589 fileMeta.put(CHECKSUM, CodecUtil.retrieveChecksum(in));
590 } catch(Exception e) {
591 LOG.warn("Could not read checksum from index file: " + infos.getSegmentsFileName(), e);
592 }
593 }
594 }
595 }
596 result.add(fileMeta);
597 } catch (IOException e) {
598 rsp.add("status", "unable to get file names for given index generation");
599 rsp.add(EXCEPTION, e);
600 LOG.error("Unable to get file names for indexCommit generation: " + gen, e);
601 } finally {
602 if (dir != null) {
603 try {
604 core.getDirectoryFactory().release(dir);
605 } catch (IOException e) {
606 SolrException.log(LOG, "Could not release directory after fetching file list", e);
607 }
608 }
609 }
610 rsp.add(CMD_GET_FILE_LIST, result);
611 if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware())
612 return;
613 LOG.debug("Adding config files to list: " + includeConfFiles);
614
615 rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
616 }
617
618
619
620
621
622
623
624
625 List<Map<String, Object>> getConfFileInfoFromCache(NamedList<String> nameAndAlias,
626 final Map<String, FileInfo> confFileInfoCache) {
627 List<Map<String, Object>> confFiles = new ArrayList<>();
628 synchronized (confFileInfoCache) {
629 File confDir = new File(core.getResourceLoader().getConfigDir());
630 Checksum checksum = null;
631 for (int i = 0; i < nameAndAlias.size(); i++) {
632 String cf = nameAndAlias.getName(i);
633 File f = new File(confDir, cf);
634 if (!f.exists() || f.isDirectory()) continue;
635 FileInfo info = confFileInfoCache.get(cf);
636 if (info == null || info.lastmodified != f.lastModified() || info.size != f.length()) {
637 if (checksum == null) checksum = new Adler32();
638 info = new FileInfo(f.lastModified(), cf, f.length(), getCheckSum(checksum, f));
639 confFileInfoCache.put(cf, info);
640 }
641 Map<String, Object> m = info.getAsMap();
642 if (nameAndAlias.getVal(i) != null) m.put(ALIAS, nameAndAlias.getVal(i));
643 confFiles.add(m);
644 }
645 }
646 return confFiles;
647 }
648
649 static class FileInfo {
650 long lastmodified;
651 String name;
652 long size;
653 long checksum;
654
655 public FileInfo(long lasmodified, String name, long size, long checksum) {
656 this.lastmodified = lasmodified;
657 this.name = name;
658 this.size = size;
659 this.checksum = checksum;
660 }
661
662 Map<String, Object> getAsMap() {
663 Map<String, Object> map = new HashMap<>();
664 map.put(NAME, name);
665 map.put(SIZE, size);
666 map.put(CHECKSUM, checksum);
667 return map;
668 }
669 }
670
671 void disablePoll() {
672 if (isSlave) {
673 pollDisabled.set(true);
674 LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled);
675 }
676 }
677
678 void enablePoll() {
679 if (isSlave) {
680 pollDisabled.set(false);
681 LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
682 }
683 }
684
685 boolean isPollingDisabled() {
686 return pollDisabled.get();
687 }
688
689 @SuppressForbidden(reason = "Need currentTimeMillis, to output next execution time in replication details")
690 private void markScheduledExecutionStart() {
691 executorStartTime = System.currentTimeMillis();
692 }
693
694 private Date getNextScheduledExecTime() {
695 Date nextTime = null;
696 if (executorStartTime > 0)
697 nextTime = new Date(executorStartTime + TimeUnit.MILLISECONDS.convert(pollIntervalNs, TimeUnit.NANOSECONDS));
698 return nextTime;
699 }
700
701 int getTimesReplicatedSinceStartup() {
702 return numTimesReplicated;
703 }
704
705 void setTimesReplicatedSinceStartup() {
706 numTimesReplicated++;
707 }
708
709 long getIndexSize() {
710 Directory dir;
711 long size = 0;
712 try {
713 dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
714 try {
715 size = DirectoryFactory.sizeOfDirectory(dir);
716 } finally {
717 core.getDirectoryFactory().release(dir);
718 }
719 } catch (IOException e) {
720 SolrException.log(LOG, "IO error while trying to get the size of the Directory", e);
721 }
722 return size;
723 }
724
725 @Override
726 public String getDescription() {
727 return "ReplicationHandler provides replication of index and configuration files from Master to Slaves";
728 }
729
730
731
732
733 private CommitVersionInfo getIndexVersion() {
734 CommitVersionInfo v = null;
735 RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
736 try {
737 v = CommitVersionInfo.build(searcher.get().getIndexReader().getIndexCommit());
738 } catch (IOException e) {
739 LOG.warn("Unable to get index commit: ", e);
740 } finally {
741 searcher.decref();
742 }
743 return v;
744 }
745
746 @Override
747 @SuppressWarnings("unchecked")
748 public NamedList getStatistics() {
749 NamedList list = super.getStatistics();
750 if (core != null) {
751 list.add("indexSize", NumberUtils.readableSize(getIndexSize()));
752 CommitVersionInfo vInfo = (core != null && !core.isClosed()) ? getIndexVersion(): null;
753 list.add("indexVersion", null == vInfo ? 0 : vInfo.version);
754 list.add(GENERATION, null == vInfo ? 0 : vInfo.generation);
755
756 list.add("indexPath", core.getIndexDir());
757 list.add("isMaster", String.valueOf(isMaster));
758 list.add("isSlave", String.valueOf(isSlave));
759
760 IndexFetcher fetcher = currentIndexFetcher;
761 if (fetcher != null) {
762 list.add(MASTER_URL, fetcher.getMasterUrl());
763 if (getPollInterval() != null) {
764 list.add(POLL_INTERVAL, getPollInterval());
765 }
766 list.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
767 list.add("isReplicating", String.valueOf(isReplicating()));
768 long elapsed = fetcher.getReplicationTimeElapsed();
769 long val = fetcher.getTotalBytesDownloaded();
770 if (elapsed > 0) {
771 list.add("timeElapsed", elapsed);
772 list.add("bytesDownloaded", val);
773 list.add("downloadSpeed", val / elapsed);
774 }
775 Properties props = loadReplicationProperties();
776 addVal(list, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
777 addVal(list, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class);
778 addVal(list, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Date.class);
779 addVal(list, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
780 addVal(list, IndexFetcher.TIMES_FAILED, props, Integer.class);
781 addVal(list, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class);
782 addVal(list, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
783 addVal(list, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class);
784 addVal(list, IndexFetcher.CONF_FILES_REPLICATED, props, String.class);
785 }
786 if (isMaster) {
787 if (includeConfFiles != null) list.add("confFilesToReplicate", includeConfFiles);
788 list.add(REPLICATE_AFTER, getReplicateAfterStrings());
789 list.add("replicationEnabled", String.valueOf(replicationEnabled.get()));
790 }
791 }
792 return list;
793 }
794
795
796
797
798 private NamedList<Object> getReplicationDetails(boolean showSlaveDetails) {
799 NamedList<Object> details = new SimpleOrderedMap<>();
800 NamedList<Object> master = new SimpleOrderedMap<>();
801 NamedList<Object> slave = new SimpleOrderedMap<>();
802
803 details.add("indexSize", NumberUtils.readableSize(getIndexSize()));
804 details.add("indexPath", core.getIndexDir());
805 details.add(CMD_SHOW_COMMITS, getCommits());
806 details.add("isMaster", String.valueOf(isMaster));
807 details.add("isSlave", String.valueOf(isSlave));
808 CommitVersionInfo vInfo = getIndexVersion();
809 details.add("indexVersion", null == vInfo ? 0 : vInfo.version);
810 details.add(GENERATION, null == vInfo ? 0 : vInfo.generation);
811
812 IndexCommit commit = indexCommitPoint;
813
814 if (isMaster) {
815 if (includeConfFiles != null) master.add(CONF_FILES, includeConfFiles);
816 master.add(REPLICATE_AFTER, getReplicateAfterStrings());
817 master.add("replicationEnabled", String.valueOf(replicationEnabled.get()));
818 }
819
820 if (isMaster && commit != null) {
821 CommitVersionInfo repCommitInfo = CommitVersionInfo.build(commit);
822 master.add("replicableVersion", repCommitInfo.version);
823 master.add("replicableGeneration", repCommitInfo.generation);
824 }
825
826 IndexFetcher fetcher = currentIndexFetcher;
827 if (fetcher != null) {
828 Properties props = loadReplicationProperties();
829 if (showSlaveDetails) {
830 try {
831 NamedList nl = fetcher.getDetails();
832 slave.add("masterDetails", nl.get(CMD_DETAILS));
833 } catch (Exception e) {
834 LOG.warn(
835 "Exception while invoking 'details' method for replication on master ",
836 e);
837 slave.add(ERR_STATUS, "invalid_master");
838 }
839 }
840 slave.add(MASTER_URL, fetcher.getMasterUrl());
841 if (getPollInterval() != null) {
842 slave.add(POLL_INTERVAL, getPollInterval());
843 }
844 Date nextScheduled = getNextScheduledExecTime();
845 if (nextScheduled != null && !isPollingDisabled()) {
846 slave.add(NEXT_EXECUTION_AT, nextScheduled.toString());
847 } else if (isPollingDisabled()) {
848 slave.add(NEXT_EXECUTION_AT, "Polling disabled");
849 }
850 addVal(slave, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class);
851 addVal(slave, IndexFetcher.INDEX_REPLICATED_AT_LIST, props, List.class);
852 addVal(slave, IndexFetcher.REPLICATION_FAILED_AT_LIST, props, List.class);
853 addVal(slave, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class);
854 addVal(slave, IndexFetcher.CONF_FILES_REPLICATED, props, Integer.class);
855 addVal(slave, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class);
856 addVal(slave, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Integer.class);
857 addVal(slave, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
858 addVal(slave, IndexFetcher.TIMES_FAILED, props, Integer.class);
859 addVal(slave, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
860 addVal(slave, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
861
862 slave.add("currentDate", new Date().toString());
863 slave.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
864 boolean isReplicating = isReplicating();
865 slave.add("isReplicating", String.valueOf(isReplicating));
866 if (isReplicating) {
867 try {
868 long bytesToDownload = 0;
869 List<String> filesToDownload = new ArrayList<>();
870 for (Map<String, Object> file : fetcher.getFilesToDownload()) {
871 filesToDownload.add((String) file.get(NAME));
872 bytesToDownload += (Long) file.get(SIZE);
873 }
874
875
876 for (Map<String, Object> file : fetcher.getConfFilesToDownload()) {
877 filesToDownload.add((String) file.get(NAME));
878 bytesToDownload += (Long) file.get(SIZE);
879 }
880
881 slave.add("filesToDownload", filesToDownload);
882 slave.add("numFilesToDownload", String.valueOf(filesToDownload.size()));
883 slave.add("bytesToDownload", NumberUtils.readableSize(bytesToDownload));
884
885 long bytesDownloaded = 0;
886 List<String> filesDownloaded = new ArrayList<>();
887 for (Map<String, Object> file : fetcher.getFilesDownloaded()) {
888 filesDownloaded.add((String) file.get(NAME));
889 bytesDownloaded += (Long) file.get(SIZE);
890 }
891
892
893 for (Map<String, Object> file : fetcher.getConfFilesDownloaded()) {
894 filesDownloaded.add((String) file.get(NAME));
895 bytesDownloaded += (Long) file.get(SIZE);
896 }
897
898 Map<String, Object> currentFile = fetcher.getCurrentFile();
899 String currFile = null;
900 long currFileSize = 0, currFileSizeDownloaded = 0;
901 float percentDownloaded = 0;
902 if (currentFile != null) {
903 currFile = (String) currentFile.get(NAME);
904 currFileSize = (Long) currentFile.get(SIZE);
905 if (currentFile.containsKey("bytesDownloaded")) {
906 currFileSizeDownloaded = (Long) currentFile.get("bytesDownloaded");
907 bytesDownloaded += currFileSizeDownloaded;
908 if (currFileSize > 0)
909 percentDownloaded = (currFileSizeDownloaded * 100) / currFileSize;
910 }
911 }
912 slave.add("filesDownloaded", filesDownloaded);
913 slave.add("numFilesDownloaded", String.valueOf(filesDownloaded.size()));
914
915 long estimatedTimeRemaining = 0;
916
917 Date replicationStartTimeStamp = fetcher.getReplicationStartTimeStamp();
918 if (replicationStartTimeStamp != null) {
919 slave.add("replicationStartTime", replicationStartTimeStamp.toString());
920 }
921 long elapsed = fetcher.getReplicationTimeElapsed();
922 slave.add("timeElapsed", String.valueOf(elapsed) + "s");
923
924 if (bytesDownloaded > 0)
925 estimatedTimeRemaining = ((bytesToDownload - bytesDownloaded) * elapsed) / bytesDownloaded;
926 float totalPercent = 0;
927 long downloadSpeed = 0;
928 if (bytesToDownload > 0)
929 totalPercent = (bytesDownloaded * 100) / bytesToDownload;
930 if (elapsed > 0)
931 downloadSpeed = (bytesDownloaded / elapsed);
932 if (currFile != null)
933 slave.add("currentFile", currFile);
934 slave.add("currentFileSize", NumberUtils.readableSize(currFileSize));
935 slave.add("currentFileSizeDownloaded", NumberUtils.readableSize(currFileSizeDownloaded));
936 slave.add("currentFileSizePercent", String.valueOf(percentDownloaded));
937 slave.add("bytesDownloaded", NumberUtils.readableSize(bytesDownloaded));
938 slave.add("totalPercent", String.valueOf(totalPercent));
939 slave.add("timeRemaining", String.valueOf(estimatedTimeRemaining) + "s");
940 slave.add("downloadSpeed", NumberUtils.readableSize(downloadSpeed));
941 } catch (Exception e) {
942 LOG.error("Exception while writing replication details: ", e);
943 }
944 }
945 }
946
947 if (isMaster)
948 details.add("master", master);
949 if (slave.size() > 0)
950 details.add("slave", slave);
951
952 NamedList snapshotStats = snapShootDetails;
953 if (snapshotStats != null)
954 details.add(CMD_BACKUP, snapshotStats);
955
956 return details;
957 }
958
959 private void addVal(NamedList<Object> nl, String key, Properties props, Class clzz) {
960 String s = props.getProperty(key);
961 if (s == null || s.trim().length() == 0) return;
962 if (clzz == Date.class) {
963 try {
964 Long l = Long.parseLong(s);
965 nl.add(key, new Date(l).toString());
966 } catch (NumberFormatException e) { }
967 } else if (clzz == List.class) {
968 String ss[] = s.split(",");
969 List<String> l = new ArrayList<>();
970 for (String s1 : ss) {
971 l.add(new Date(Long.valueOf(s1)).toString());
972 }
973 nl.add(key, l);
974 } else {
975 nl.add(key, s);
976 }
977
978 }
979
980 private List<String> getReplicateAfterStrings() {
981 List<String> replicateAfter = new ArrayList<>();
982 if (replicateOnCommit)
983 replicateAfter.add("commit");
984 if (replicateOnOptimize)
985 replicateAfter.add("optimize");
986 if (replicateOnStart)
987 replicateAfter.add("startup");
988 return replicateAfter;
989 }
990
991 Properties loadReplicationProperties() {
992 Directory dir = null;
993 try {
994 try {
995 dir = core.getDirectoryFactory().get(core.getDataDir(),
996 DirContext.META_DATA, core.getSolrConfig().indexConfig.lockType);
997 IndexInput input;
998 try {
999 input = dir.openInput(
1000 IndexFetcher.REPLICATION_PROPERTIES, IOContext.DEFAULT);
1001 } catch (FileNotFoundException | NoSuchFileException e) {
1002 return new Properties();
1003 }
1004
1005 try {
1006 final InputStream is = new PropertiesInputStream(input);
1007 Properties props = new Properties();
1008 props.load(new InputStreamReader(is, StandardCharsets.UTF_8));
1009 return props;
1010 } finally {
1011 input.close();
1012 }
1013 } finally {
1014 if (dir != null) {
1015 core.getDirectoryFactory().release(dir);
1016 }
1017 }
1018 } catch (IOException e) {
1019 throw new SolrException(ErrorCode.SERVER_ERROR, e);
1020 }
1021 }
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031 private void setupPolling(String intervalStr) {
1032 pollIntervalStr = intervalStr;
1033 pollIntervalNs = readIntervalNs(pollIntervalStr);
1034 if (pollIntervalNs == null || pollIntervalNs <= 0) {
1035 LOG.info(" No value set for 'pollInterval'. Timer Task not started.");
1036 return;
1037 }
1038
1039 Runnable task = new Runnable() {
1040 @Override
1041 public void run() {
1042 if (pollDisabled.get()) {
1043 LOG.info("Poll disabled");
1044 return;
1045 }
1046 try {
1047 LOG.debug("Polling for index modifications");
1048 markScheduledExecutionStart();
1049 doFetch(null, false);
1050 } catch (Exception e) {
1051 LOG.error("Exception in fetching index", e);
1052 }
1053 }
1054 };
1055 executorService = Executors.newSingleThreadScheduledExecutor(
1056 new DefaultSolrThreadFactory("indexFetcher"));
1057
1058 long initialDelayNs = new Random().nextLong() % pollIntervalNs
1059 + TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
1060 executorService.scheduleAtFixedRate(task, initialDelayNs, pollIntervalNs, TimeUnit.NANOSECONDS);
1061 LOG.info("Poll scheduled at an interval of {}ms",
1062 TimeUnit.MILLISECONDS.convert(pollIntervalNs, TimeUnit.NANOSECONDS));
1063 }
1064
1065 @Override
1066 @SuppressWarnings("unchecked")
1067 public void inform(SolrCore core) {
1068 this.core = core;
1069 registerCloseHook();
1070 Object nbtk = initArgs.get(NUMBER_BACKUPS_TO_KEEP_INIT_PARAM);
1071 if(nbtk!=null) {
1072 numberBackupsToKeep = Integer.parseInt(nbtk.toString());
1073 } else {
1074 numberBackupsToKeep = 0;
1075 }
1076 NamedList slave = (NamedList) initArgs.get("slave");
1077 boolean enableSlave = isEnabled( slave );
1078 if (enableSlave) {
1079 currentIndexFetcher = pollingIndexFetcher = new IndexFetcher(slave, this, core);
1080 setupPolling((String) slave.get(POLL_INTERVAL));
1081 isSlave = true;
1082 }
1083 NamedList master = (NamedList) initArgs.get("master");
1084 boolean enableMaster = isEnabled( master );
1085
1086 if (enableMaster || enableSlave) {
1087 if (core.getCoreDescriptor().getCoreContainer().getZkController() != null) {
1088 LOG.warn("SolrCloud is enabled for core " + core.getName() + " but so is old-style replication. Make sure you" +
1089 " intend this behavior, it usually indicates a mis-configuration. Master setting is " +
1090 Boolean.toString(enableMaster) + " and slave setting is " + Boolean.toString(enableSlave));
1091 }
1092 }
1093
1094 if (!enableSlave && !enableMaster) {
1095 enableMaster = true;
1096 master = new NamedList<>();
1097 }
1098
1099 if (enableMaster) {
1100 includeConfFiles = (String) master.get(CONF_FILES);
1101 if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
1102 List<String> files = Arrays.asList(includeConfFiles.split(","));
1103 for (String file : files) {
1104 if (file.trim().length() == 0) continue;
1105 String[] strs = file.trim().split(":");
1106
1107 confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null);
1108 }
1109 LOG.info("Replication enabled for following config files: " + includeConfFiles);
1110 }
1111 List backup = master.getAll("backupAfter");
1112 boolean backupOnCommit = backup.contains("commit");
1113 boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize");
1114 List replicateAfter = master.getAll(REPLICATE_AFTER);
1115 replicateOnCommit = replicateAfter.contains("commit");
1116 replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");
1117
1118 if (!replicateOnCommit && ! replicateOnOptimize) {
1119 replicateOnCommit = true;
1120 }
1121
1122
1123
1124 if (replicateOnOptimize) {
1125 IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy();
1126 IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy();
1127 if (policy instanceof SolrDeletionPolicy) {
1128 SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy;
1129 if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) {
1130 solrPolicy.setMaxOptimizedCommitsToKeep(1);
1131 }
1132 } else {
1133 LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy);
1134 }
1135 }
1136
1137 if (replicateOnOptimize || backupOnOptimize) {
1138 core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize));
1139 }
1140 if (replicateOnCommit || backupOnCommit) {
1141 replicateOnCommit = true;
1142 core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit));
1143 }
1144 if (replicateAfter.contains("startup")) {
1145 replicateOnStart = true;
1146 RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
1147 try {
1148 DirectoryReader reader = s==null ? null : s.get().getIndexReader();
1149 if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {
1150 try {
1151 if(replicateOnOptimize){
1152 Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory());
1153 for (IndexCommit ic : commits) {
1154 if(ic.getSegmentCount() == 1){
1155 if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic;
1156 }
1157 }
1158 } else{
1159 indexCommitPoint = reader.getIndexCommit();
1160 }
1161 } finally {
1162
1163
1164
1165
1166
1167
1168
1169 }
1170 }
1171
1172
1173 RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
1174 iw.decref();
1175
1176 } catch (IOException e) {
1177 LOG.warn("Unable to get IndexCommit on startup", e);
1178 } finally {
1179 if (s!=null) s.decref();
1180 }
1181 }
1182 String reserve = (String) master.get(RESERVE);
1183 if (reserve != null && !reserve.trim().equals("")) {
1184 reserveCommitDuration = readIntervalMs(reserve);
1185 }
1186 LOG.info("Commits will be reserved for " + reserveCommitDuration);
1187 isMaster = true;
1188 }
1189 }
1190
1191
1192 private boolean isEnabled( NamedList params ){
1193 if( params == null ) return false;
1194 Object enable = params.get( "enable" );
1195 if( enable == null ) return true;
1196 if( enable instanceof String )
1197 return StrUtils.parseBool( (String)enable );
1198 return Boolean.TRUE.equals( enable );
1199 }
1200
1201
1202
1203
1204 private void registerCloseHook() {
1205 core.addCloseHook(new CloseHook() {
1206 @Override
1207 public void preClose(SolrCore core) {
1208 try {
1209 if (executorService != null) executorService.shutdown();
1210 } finally {
1211 if (pollingIndexFetcher != null) {
1212 pollingIndexFetcher.destroy();
1213 }
1214 }
1215 if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
1216 currentIndexFetcher.destroy();
1217 }
1218 }
1219
1220 @Override
1221 public void postClose(SolrCore core) {}
1222 });
1223
1224 core.addCloseHook(new CloseHook() {
1225 @Override
1226 public void preClose(SolrCore core) {
1227 ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
1228 if (restoreFuture != null) {
1229 restoreFuture.cancel(false);
1230 }
1231 }
1232
1233 @Override
1234 public void postClose(SolrCore core) {}
1235 });
1236 }
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246 private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) {
1247 return new SolrEventListener() {
1248 @Override
1249 public void init(NamedList args) { }
1250
1251
1252
1253
1254 @Override
1255 public void postCommit() {
1256 IndexCommit currentCommitPoint = core.getDeletionPolicy().getLatestCommit();
1257
1258 if (getCommit) {
1259
1260 indexCommitPoint = currentCommitPoint;
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272 }
1273 if (snapshoot) {
1274 try {
1275 int numberToKeep = numberBackupsToKeep;
1276 if (numberToKeep < 1) {
1277 numberToKeep = Integer.MAX_VALUE;
1278 }
1279 SnapShooter snapShooter = new SnapShooter(core, null, null);
1280 snapShooter.validateCreateSnapshot();
1281 snapShooter.createSnapAsync(currentCommitPoint, numberToKeep, ReplicationHandler.this);
1282 } catch (Exception e) {
1283 LOG.error("Exception while snapshooting", e);
1284 }
1285 }
1286 }
1287
1288 @Override
1289 public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) { }
1290
1291 @Override
1292 public void postSoftCommit() {
1293
1294 }
1295 };
1296 }
1297
1298
1299
1300
1301 private class DirectoryFileStream implements SolrCore.RawWriter {
1302 protected SolrParams params;
1303
1304 protected FastOutputStream fos;
1305
1306 protected Long indexGen;
1307 protected IndexDeletionPolicyWrapper delPolicy;
1308
1309 protected String fileName;
1310 protected String cfileName;
1311 protected String sOffset;
1312 protected String sLen;
1313 protected String compress;
1314 protected boolean useChecksum;
1315
1316 protected long offset = -1;
1317 protected int len = -1;
1318
1319 protected Checksum checksum;
1320
1321 private RateLimiter rateLimiter;
1322
1323 byte[] buf;
1324
1325 public DirectoryFileStream(SolrParams solrParams) {
1326 params = solrParams;
1327 delPolicy = core.getDeletionPolicy();
1328
1329 fileName = params.get(FILE);
1330 cfileName = params.get(CONF_FILE_SHORT);
1331 sOffset = params.get(OFFSET);
1332 sLen = params.get(LEN);
1333 compress = params.get(COMPRESSION);
1334 useChecksum = params.getBool(CHECKSUM, false);
1335 indexGen = params.getLong(GENERATION, null);
1336 if (useChecksum) {
1337 checksum = new Adler32();
1338 }
1339
1340 double maxWriteMBPerSec = params.getDouble(MAX_WRITE_PER_SECOND, Double.MAX_VALUE);
1341 rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
1342 }
1343
1344 protected void initWrite() throws IOException {
1345 if (sOffset != null) offset = Long.parseLong(sOffset);
1346 if (sLen != null) len = Integer.parseInt(sLen);
1347 if (fileName == null && cfileName == null) {
1348
1349 writeNothingAndFlush();
1350 }
1351 buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
1352
1353
1354 if(indexGen != null) {
1355 delPolicy.saveCommitPoint(indexGen);
1356 }
1357 }
1358
1359 protected void createOutputStream(OutputStream out) {
1360 if (Boolean.parseBoolean(compress)) {
1361 fos = new FastOutputStream(new DeflaterOutputStream(out));
1362 } else {
1363 fos = new FastOutputStream(out);
1364 }
1365 }
1366
1367 protected void extendReserveAndReleaseCommitPoint() {
1368 if(indexGen != null) {
1369
1370
1371
1372 delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
1373
1374
1375 delPolicy.releaseCommitPoint(indexGen);
1376 }
1377
1378 }
1379 public void write(OutputStream out) throws IOException {
1380 createOutputStream(out);
1381
1382 IndexInput in = null;
1383 try {
1384 initWrite();
1385
1386 RefCounted<SolrIndexSearcher> sref = core.getSearcher();
1387 Directory dir;
1388 try {
1389 SolrIndexSearcher searcher = sref.get();
1390 dir = searcher.getIndexReader().directory();
1391 } finally {
1392 sref.decref();
1393 }
1394 in = dir.openInput(fileName, IOContext.READONCE);
1395
1396 if (offset != -1) in.seek(offset);
1397
1398 long filelen = dir.fileLength(fileName);
1399 long maxBytesBeforePause = 0;
1400
1401 while (true) {
1402 offset = offset == -1 ? 0 : offset;
1403 int read = (int) Math.min(buf.length, filelen - offset);
1404 in.readBytes(buf, 0, read);
1405
1406 fos.writeInt(read);
1407 if (useChecksum) {
1408 checksum.reset();
1409 checksum.update(buf, 0, read);
1410 fos.writeLong(checksum.getValue());
1411 }
1412 fos.write(buf, 0, read);
1413 fos.flush();
1414 LOG.debug("Wrote {} bytes for file {}", offset + read, fileName);
1415
1416
1417 maxBytesBeforePause += read;
1418 if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
1419 rateLimiter.pause(maxBytesBeforePause);
1420 maxBytesBeforePause = 0;
1421 }
1422 if (read != buf.length) {
1423 writeNothingAndFlush();
1424 fos.close();
1425 break;
1426 }
1427 offset += read;
1428 in.seek(offset);
1429 }
1430 } catch (IOException e) {
1431 LOG.warn("Exception while writing response for params: " + params, e);
1432 } finally {
1433 if (in != null) {
1434 in.close();
1435 }
1436 extendReserveAndReleaseCommitPoint();
1437 }
1438 }
1439
1440
1441
1442
1443
1444 protected void writeNothingAndFlush() throws IOException {
1445 fos.writeInt(0);
1446 fos.flush();
1447 }
1448 }
1449
1450
1451
1452 private class LocalFsFileStream extends DirectoryFileStream {
1453
1454 public LocalFsFileStream(SolrParams solrParams) {
1455 super(solrParams);
1456 }
1457
1458 @Override
1459 public void write(OutputStream out) throws IOException {
1460 createOutputStream(out);
1461 FileInputStream inputStream = null;
1462 try {
1463 initWrite();
1464
1465
1466 File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
1467
1468 if (file.exists() && file.canRead()) {
1469 inputStream = new FileInputStream(file);
1470 FileChannel channel = inputStream.getChannel();
1471
1472 if (offset != -1)
1473 channel.position(offset);
1474 ByteBuffer bb = ByteBuffer.wrap(buf);
1475
1476 while (true) {
1477 bb.clear();
1478 long bytesRead = channel.read(bb);
1479 if (bytesRead <= 0) {
1480 writeNothingAndFlush();
1481 fos.close();
1482 break;
1483 }
1484 fos.writeInt((int) bytesRead);
1485 if (useChecksum) {
1486 checksum.reset();
1487 checksum.update(buf, 0, (int) bytesRead);
1488 fos.writeLong(checksum.getValue());
1489 }
1490 fos.write(buf, 0, (int) bytesRead);
1491 fos.flush();
1492 }
1493 } else {
1494 writeNothingAndFlush();
1495 }
1496 } catch (IOException e) {
1497 LOG.warn("Exception while writing response for params: " + params, e);
1498 } finally {
1499 IOUtils.closeQuietly(inputStream);
1500 extendReserveAndReleaseCommitPoint();
1501 }
1502 }
1503 }
1504
1505 private static Integer readIntervalMs(String interval) {
1506 return (int) TimeUnit.MILLISECONDS.convert(readIntervalNs(interval), TimeUnit.NANOSECONDS);
1507 }
1508
1509 private static Long readIntervalNs(String interval) {
1510 if (interval == null)
1511 return null;
1512 int result = 0;
1513 Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
1514 if (m.find()) {
1515 String hr = m.group(1);
1516 String min = m.group(2);
1517 String sec = m.group(3);
1518 result = 0;
1519 try {
1520 if (sec != null && sec.length() > 0)
1521 result += Integer.parseInt(sec);
1522 if (min != null && min.length() > 0)
1523 result += (60 * Integer.parseInt(min));
1524 if (hr != null && hr.length() > 0)
1525 result += (60 * 60 * Integer.parseInt(hr));
1526 return TimeUnit.NANOSECONDS.convert(result, TimeUnit.SECONDS);
1527 } catch (NumberFormatException e) {
1528 throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
1529 }
1530 } else {
1531 throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
1532 }
1533 }
1534
1535 private static final String LOCATION = "location";
1536
1537 private static final String SUCCESS = "success";
1538
1539 private static final String FAILED = "failed";
1540
1541 private static final String EXCEPTION = "exception";
1542
1543 public static final String MASTER_URL = "masterUrl";
1544
1545 public static final String STATUS = "status";
1546
1547 public static final String COMMAND = "command";
1548
1549 public static final String CMD_DETAILS = "details";
1550
1551 public static final String CMD_BACKUP = "backup";
1552
1553 public static final String CMD_RESTORE = "restore";
1554
1555 public static final String CMD_RESTORE_STATUS = "restorestatus";
1556
1557 public static final String CMD_FETCH_INDEX = "fetchindex";
1558
1559 public static final String CMD_ABORT_FETCH = "abortfetch";
1560
1561 public static final String CMD_GET_FILE_LIST = "filelist";
1562
1563 public static final String CMD_GET_FILE = "filecontent";
1564
1565 public static final String CMD_DISABLE_POLL = "disablepoll";
1566
1567 public static final String CMD_DISABLE_REPL = "disablereplication";
1568
1569 public static final String CMD_ENABLE_REPL = "enablereplication";
1570
1571 public static final String CMD_ENABLE_POLL = "enablepoll";
1572
1573 public static final String CMD_INDEX_VERSION = "indexversion";
1574
1575 public static final String CMD_SHOW_COMMITS = "commits";
1576
1577 public static final String CMD_DELETE_BACKUP = "deletebackup";
1578
1579 public static final String GENERATION = "generation";
1580
1581 public static final String OFFSET = "offset";
1582
1583 public static final String LEN = "len";
1584
1585 public static final String FILE = "file";
1586
1587 public static final String SIZE = "size";
1588
1589 public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
1590
1591 public static final String CONF_FILE_SHORT = "cf";
1592
1593 public static final String CHECKSUM = "checksum";
1594
1595 public static final String ALIAS = "alias";
1596
1597 public static final String CONF_CHECKSUM = "confchecksum";
1598
1599 public static final String CONF_FILES = "confFiles";
1600
1601 public static final String REPLICATE_AFTER = "replicateAfter";
1602
1603 public static final String FILE_STREAM = "filestream";
1604
1605 public static final String POLL_INTERVAL = "pollInterval";
1606
1607 public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
1608
1609 private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
1610
1611 public static final int PACKET_SZ = 1024 * 1024;
1612
1613 public static final String RESERVE = "commitReserveDuration";
1614
1615 public static final String COMPRESSION = "compression";
1616
1617 public static final String EXTERNAL = "external";
1618
1619 public static final String INTERNAL = "internal";
1620
1621 public static final String ERR_STATUS = "ERROR";
1622
1623 public static final String OK_STATUS = "OK";
1624
1625 public static final String NEXT_EXECUTION_AT = "nextExecutionAt";
1626
1627 public static final String NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM = "numberToKeep";
1628
1629 public static final String NUMBER_BACKUPS_TO_KEEP_INIT_PARAM = "maxNumberOfBackups";
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640 public static final String WAIT = "wait";
1641 }